/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.jclouds.concurrent;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * An {@link ExecutorService} that executes each submitted task using one of possibly several pooled threads, normally
 * configured using {@link DynamicExecutors} factory methods.
 */
class DynamicThreadPoolExecutor extends ThreadPoolExecutor {
   /**
    * number of threads that are actively executing tasks
    */
   private final AtomicInteger activeCount = new AtomicInteger();

   DynamicThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
         BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
      super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
   }

   @Override
   public int getActiveCount() {
      return activeCount.get();
   }

   @Override
   protected void beforeExecute(Thread t, Runnable r) {
      activeCount.incrementAndGet();
   }

   @Override
   protected void afterExecute(Runnable r, Throwable t) {
      activeCount.decrementAndGet();
   }

   /**
    * Much like a {@link SynchronousQueue} which acts as a rendezvous channel. It is well suited for handoff designs, in
    * which a tasks is only queued if there is an available thread to pick it up.
    * <p/>
    * This queue is correlated with a thread-pool, and allows insertions to the queue only if there is a free thread
    * that can poll this task. Otherwise, the task is rejected and the decision is left up to one of the
    * {@link RejectedExecutionHandler} policies:
    * <ol>
    * <li> {@link ForceQueuePolicy} - forces the queue to accept the rejected task.</li>
    * <li> {@link TimedBlockingPolicy} - waits for a given time for the task to be executed.</li>
    * </ol>
    * 
    */
   static class DynamicQueue<E> extends LinkedBlockingQueue<E> {
      private static final long serialVersionUID = 1L;

      /**
       * The executor this Queue belongs to
       */
      private transient ThreadPoolExecutor executor;

      /**
       * Creates a <tt>DynamicQueue</tt> with a capacity of {@link Integer#MAX_VALUE}.
       */
      public DynamicQueue() {
         super();
      }

      /**
       * Creates a <tt>DynamicQueue</tt> with the given (fixed) capacity.
       * 
       * @param capacity
       *           the capacity of this queue.
       */
      DynamicQueue(int capacity) {
         super(capacity);
      }

      /**
       * Sets the executor this queue belongs to.
       */
      public void setThreadPoolExecutor(ThreadPoolExecutor executor) {
         this.executor = executor;
      }

      /**
       * Inserts the specified element at the tail of this queue if there is at least one available thread to run the
       * current task. If all pool threads are actively busy, it rejects the offer.
       * 
       * @param o
       *           the element to add.
       * @return <tt>true</tt> if it was possible to add the element to this queue, else <tt>false</tt>
       * @see ThreadPoolExecutor#execute(Runnable)
       */
      @Override
      public boolean offer(E o) {
         int allWorkingThreads = executor.getActiveCount() + super.size();
         return allWorkingThreads < executor.getPoolSize() && super.offer(o);
      }
   }

   /**
    * A handler for rejected tasks that adds the specified element to this queue, waiting if necessary for space to
    * become available.
    */
   static class ForceQueuePolicy implements RejectedExecutionHandler {
      public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
         if (executor.isShutdown()) {
            throw new RejectedExecutionException("Rejected execution of task [" + r.getClass()
                  + "] since the executor is shutdown.");
         }
         try {
            executor.getQueue().put(r);
         } catch (InterruptedException e) {
            // should never happen since we never wait
            throw new RejectedExecutionException(e);
         }
      }
   }

   /**
    * A handler for rejected tasks that inserts the specified element into this queue, waiting if necessary up to the
    * specified wait time for space to become available.
    */
   static class TimedBlockingPolicy implements RejectedExecutionHandler {
      private final long waitTime;

      /**
       * @param waitTime
       *           wait time in milliseconds for space to become available.
       */
      TimedBlockingPolicy(long waitTime) {
         this.waitTime = waitTime;
      }

      public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
         if (executor.isShutdown()) {
            throw new RejectedExecutionException("Rejected execution of task [" + r.getClass()
                  + "] since the executor is shutdown.");
         }
         try {
            boolean successful = executor.getQueue().offer(r, waitTime, TimeUnit.MILLISECONDS);
            if (!successful)
               throw new RejectedExecutionException("Rejected execution after waiting " + waitTime + " ms for task ["
                     + r.getClass() + "] to be executed.");
         } catch (InterruptedException e) {
            throw new RejectedExecutionException(e);
         }
      }
   }
}
